#include "posix_queue.hpp"

#include "topconn_string.hpp"

namespace topconn
{
    static int my_popen(const char* cmdbuf, char* outbuf)
    {
        char resbuf[256] = { 0 };
        if (cmdbuf == nullptr) return -1;
        FILE* fp = popen(cmdbuf, "r");
        if (fp == nullptr)  {
            printf("popen cmd error:%d\n", errno);
            return -2;
        }
        while (!feof(fp)) {
            memset(resbuf, 0x00, sizeof(resbuf));
            if (fgets(resbuf, sizeof(resbuf) - 1, fp) != nullptr) 
                break;
        }
        strcpy(outbuf, resbuf);
        int status = pclose(fp);
        return 0;
    }

    CPosixQueue::CPosixQueue()
        : m_mqid(-1)
        , m_timeout(0)
    {
        memset(m_mqpath, 0, sizeof(m_mqpath));
        m_attr.mq_maxmsg = POSIX_DEFAULT_MAXMSG;
        m_attr.mq_msgsize = POSIX_DEFAULT_MSGSIZE;
    }

    CPosixQueue::CPosixQueue(mqd_t mqid)
        : m_mqid(mqid)
        , m_timeout(0)
    {
        memset(m_mqpath, 0, sizeof(m_mqpath));
        m_attr.mq_maxmsg = POSIX_DEFAULT_MAXMSG;
        m_attr.mq_msgsize = POSIX_DEFAULT_MSGSIZE;
    }

    CPosixQueue::~CPosixQueue()
    {
        if (m_mqid > 0)
        {
            mq_close(m_mqid);
            m_mqid = -1;
        }
    }

    void CPosixQueue::setQsize(size_t queueSize)
    { 
        char scmd_result[128] = { 0 };
        memset(scmd_result, 0x00, sizeof(scmd_result));
        if (my_popen("cat /proc/sys/fs/mqueue/msg_max", scmd_result) == 0)
        {
            topconn_string::delspace(scmd_result);
            if (atoi(scmd_result) < queueSize)
                queueSize = 10;
        }
        m_attr.mq_maxmsg = queueSize; 
    };

    void CPosixQueue::setMsize(size_t msgSize)
    { 
        char scmd_result[128] = { 0 };
        memset(scmd_result, 0x00, sizeof(scmd_result));
        if (my_popen("cat /proc/sys/fs/mqueue/msgsize_max", scmd_result) == 0)
        {
            topconn_string::delspace(scmd_result);
            if (atoi(scmd_result) < msgSize)
                msgSize = 8192;
        }
        m_attr.mq_msgsize = msgSize; 
    };

    int  CPosixQueue::Msgget(const char* mqpath, bool isUnlink)
    {
        strcpy(m_mqpath, mqpath);    
        m_mqid = mq_open(mqpath, O_RDWR | O_CREAT, 0666, &m_attr);
        if (m_mqid < 0)
        {
            if (errno == EEXIST)
                m_mqid = mq_open(mqpath, O_RDWR | O_CREAT, 0666, &m_attr);
            else
            {
                return -1;
            }
        }
        return m_mqid;
    }

    int  CPosixQueue::Msgsnd(const void* sndbuf, int sndsize, bool isBlock)
    {
        mq_attr mqAttr;
        mq_getattr(m_mqid, &mqAttr);
        if (!isBlock && mqAttr.mq_flags != O_NONBLOCK)
        {
            mqAttr.mq_flags = O_NONBLOCK;
            CPosixQueue::MsgCtl(m_mqid, POSIX_SET, &mqAttr);
        }
        if (isBlock && m_timeout > 0 && mqAttr.mq_flags == 0)
        {
#ifdef __USE_XOPEN2K
            struct timespec ts;
            ts.tv_sec = m_timeout;
            if (mq_timedsend(m_mqid, (const char*)sndbuf, sndsize, 0, &ts) != 0)
            {
                return -1;
            }
#else
            if (mq_send(m_mqid, (const char*)sndbuf, sndsize, 0) < 0)
            {
                return -1;
            }
#endif 
        }
        else
        {
            if (mq_send(m_mqid, (const char*)sndbuf, sndsize, 0) < 0)
            {
                return -1;
            }
        }
        return 0;
    }

    int  CPosixQueue::Msgrcv(char* rcvbuf, bool isBlock)
    {
        mq_attr mqAttr;
        mq_getattr(m_mqid, &mqAttr);
        if (!isBlock && mqAttr.mq_flags != O_NONBLOCK)
        {
            mqAttr.mq_flags = O_NONBLOCK;
            CPosixQueue::MsgCtl(m_mqid, POSIX_SET, &mqAttr);
        }
        if (isBlock && m_timeout > 0 && mqAttr.mq_flags == 0)
        {
#ifdef __USE_XOPEN2K
            struct timespec ts;
            ts.tv_sec = m_timeout;
            if (mq_timedreceive(m_mqid, rcvbuf, mqAttr.mq_msgsize, NULL, &ts) != 0)
            {
                if (errno != EINTR)
                    printf("receive message failed:%s\n", strerror(errno));
                return -1;
            }
#else
            if (mq_receive(m_mqid, rcvbuf, mqAttr.mq_msgsize, NULL) < 0)
            {
                if (errno != EINTR)
                    printf("receive message failed:%s\n", strerror(errno));
                return -1;
            }
#endif        
        }
        else
        {
            if (mq_receive(m_mqid, rcvbuf, mqAttr.mq_msgsize, NULL) < 0)
            {
                if (errno != EINTR)
                    printf("receive message failed:%s\n", strerror(errno));
                return -1;
            }
        }
        return 0;
    }

    int CPosixQueue::MsgDepth(int mqid)
    {
        mq_attr mqAttr;
        if (CPosixQueue::MsgCtl(mqid, POSIX_STAT, &mqAttr) != 0)
            return -1;
        return mqAttr.mq_curmsgs;
    }

    int CPosixQueue::MsgCtl(int mqid, int cmd, void* pvArgs)
    {
        if (cmd == POSIX_STAT)
        {
            mq_attr* pmqAttr = (mq_attr*)pvArgs;
            if (mq_getattr(mqid, pmqAttr) < 0)
            {
                printf("get the message queue attribute error:%s\n", strerror(errno));
                return -1;
            }
        }
        else if (cmd == POSIX_SET)
        {
            mq_attr* pmqAttr = (mq_attr*)pvArgs;
            struct mq_attr oldattr;
            mq_setattr(mqid, pmqAttr, &oldattr);
        }
        else if (cmd == POSIX_RMID)
        {
            char* mqpath = (char*)pvArgs;
            mq_unlink(mqpath);
        }
        return 0;
    }
};